Skip to main content

Databricks

Supported Databricks Runtime versions: 12.2 - 16.4 (scala 2.12)

Note: Databricks Serverless is not supported for this instrumentation. You may optionally use the DBT agent instead.

To enable integration with definity on Databricks, follow these steps:

  1. Add the Spark Agent JAR to your compute cluster.
  2. Configure jobs or tasks to track with definity.

Cluster Configuration

1. Create an Init Script

Create a script to download and add the definity Spark agent to the cluster’s CLASSPATH and set the default definity parameters. Save this script in cloud storage (e.g., S3).

definity_init.sh
#!/bin/bash
JAR_DIR="/databricks/jars"
mkdir -p $JAR_DIR
DEFINITY_JAR_URL="https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar"
curl -o $JAR_DIR/definity-spark-agent.jar $DEFINITY_JAR_URL
export CLASSPATH=$CLASSPATH:$JAR_DIR/definity-spark-agent.jar

cat > /databricks/driver/conf/00-definity.conf << EOF
spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin
spark.definity.server="https://app.definity.run"
spark.definity.api.token=YOUR_TOKEN
#spark.definity.env.name=YOUR_DEFAULT_ENV
EOF

2. Attach the Init Script to Your Compute Cluster

In the Databricks UI:

  1. Go to Cluster configurationAdvanced optionsInit Scripts.
  2. Add your script with:
    • Source: s3
    • File path: s3://your-s3-bucket/init-scripts-dir/definity_init.sh

3. Configure Spark Cluster Name [Optional]

Default cluster name is taken from databricks cluster name.

Navigate to Cluster configurationAdvanced optionsSpark and add:

spark.definity.compute.name      my_cluster_name

Note: These settings affect the default Spark session created by the cluster. Definity will monitor this session automatically.


Job Tracking

Definity offers several tracking modes on Databricks to accommodate different workflow patterns.

Multi-Task Workflow [Default]

When running Databricks workflows with multiple tasks on a cluster, Definity tracks the compute cluster separately from the logical tasks and automatically detects parameters from your workflow:

  • Pipeline name: Derived from the Databricks job name
  • Task name: Derived from the Databricks task key
  • PIT (Point in Time): Set to the timestamp when the run starts

Single-Task Cluster

For clusters running only one task, you may prefer to create a single Definity tracking session for clarity.

To enable this mode, disable automatic tracking:

spark.definity.databricks.automaticSessions.enabled=false

Then configure the following parameters: spark.definity.pipeline.name, spark.definity.pipeline.pit, and spark.definity.task.name.

You can set these in the cluster configuration or as shown in the examples below.

Programmatic Multi-Task Cluster

You can manually define task scopes within your code. First, disable automatic tracking:

spark.definity.databricks.automaticSessions.enabled=false

Start Logical Task Tacking

# Set this property to define a new task scope
spark.conf.set("spark.definity.session", f"pipeline.name={my_pipeline},pipeline.pit={pit_date},task.name={my_task}")

Stop Logical Task Tracking

For multiple logical tasks within a single session, unset the property when each task completes:

try {
// your job logic here
...
} finally {
// Signal task completion (recommended in a `finally` block to catch failures)
spark.conf.unset("spark.definity.session")
}

Note: Unsetting the session is not required for Python script jobs and notebook jobs.

Job Configuration Examples

Example: Jobs API

definity parameters can be passed via the base_parameters or parameters fields depending on the task type.

{
"tasks": [
{
"task_key": "task1",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_1",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task1"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "task2",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_2",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task2"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "python_task1",
"spark_python_task": {
"python_file": "s3://my-bucket/python_task.py",
"parameters": [
"yourArg1",
"yourArg2",
"spark.definity.task.name=python_task_1",
"spark.definity.pipeline.name=my_pipeline",
"spark.definity.pipeline.pit=2025-01-01 01:00:00"
]
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
}
],
"format": "MULTI_TASK",
"queue": {
"enabled": true
}
}

Example: Airflow Notebook Job

run_notebook = DatabricksSubmitRunOperator(
task_id="run_notebook",
json={
"notebook_task": {
"notebook_path": "/Users/[email protected]/my_notebook",
"base_parameters": {
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}"
},
},
"name": "notebook-job",
}
)

Example: Airflow Python Job

run_python = DatabricksSubmitRunOperator(
task_id="run_python_script",
json={
"spark_python_task": {
"python_file": "dbfs:/path/to/job.py",
"parameters": [
"spark.definity.pipeline.name={{ dag_run.dag_id }}",
"spark.definity.pipeline.pit={{ ts }}",
"spark.definity.task.name={{ ti.task_id }}"
]
},
"name": "python-job",
}
)